package com.fone.flumeExt.channel.bdb;


import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.concurrent.GuardedBy;

import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.Disposable;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.instrumentation.ChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fone.flumeExt.channel.FlumeEvent;
import com.google.common.base.Preconditions;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.serial.SerialBinding;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.collections.CurrentTransaction;
import com.sleepycat.collections.StoredSortedMap;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseExistsException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.TransactionConfig;

// TODO: Auto-generated Javadoc
/**
 * The Class SortedMapChannel.
 * 
 * @author Phoenics Chow
 */
@InterfaceAudience.Private
@InterfaceStability.Stable
@Disposable
public class JeSortedMapChannel extends BasicChannelSemantics {

	/** The Constant LOG. */
	private static final Logger LOG = LoggerFactory.getLogger(JeSortedMapChannel.class);

	/** The Constant defaultKeepAlive. */
	private static final Integer defaultKeepAlive = 5;

	/**
	 * The Class JEConf.
	 * 
	 * @author Phoenics Chow
	 */
	private class JEConf {

		/** The db env. */
		public BdbJeEnvironment dbEnv = null;

		/** The db. */
		public Database db = null;
	}

	/**
	 * The Class MapTransaction.
	 * 
	 * @author Phoenics Chow
	 */
	private class MapTransaction extends BasicTransactionSemantics {

		/** The put list. */
		private List<FlumeEvent> putList;

		/** The take key list. */
		private List<Long> takeKeyList;

		/** The current transaction. */
		private CurrentTransaction currentTransaction;

		/**
		 * Instantiates a new map transaction.
		 */
		public MapTransaction() {
			putList = new ArrayList<>();
			takeKeyList = new ArrayList<>();
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see org.apache.flume.channel.BasicTransactionSemantics#doBegin()
		 */
		@Override
		protected void doBegin() throws InterruptedException {
			if (currentTransaction == null) {
				currentTransaction = CurrentTransaction.getInstance(jf.dbEnv);
			}
			currentTransaction.beginTransaction(TransactionConfig.DEFAULT);
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see org.apache.flume.channel.BasicTransactionSemantics#doClose()
		 */
		@Override
		protected void doClose() {
			currentTransaction = null;
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see
		 * org.apache.flume.channel.BasicTransactionSemantics#doPut(org.apache
		 * .flume.Event)
		 */
		@Override
		protected void doPut(Event event) throws InterruptedException {
			if (event == null) {
				return;
			}
			channelCounter.incrementEventPutAttemptCount();
			if (!putList.add(new FlumeEvent(event.getHeaders(), event.getBody()))) {
				throw new ChannelException("Put queue for MapTransaction of capacity " + putList.size() + " full, consider committing more frequently, "
						+ "increasing capacity or increasing thread count");
			}
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see org.apache.flume.channel.BasicTransactionSemantics#doTake()
		 */
		@Override
		protected Event doTake() throws InterruptedException {
			channelCounter.incrementEventTakeAttemptCount();
			if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
				return null;
			}
			Event event = null;
			synchronized (queueLock) {
				while (event == null && headIndex.get() < tailIndex.get()) { // 没有超出范围
					event = queueMap.get(headIndex.get());
					long c = headIndex.get();
					headIndex.incrementAndGet(); // 头部指针后移
					if (event != null) {
						takeKeyList.add(c);
						break;
					}
				}
			}
			Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore signalling existence of entry");
			return event;
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see org.apache.flume.channel.BasicTransactionSemantics#doCommit()
		 */
		@Override
		protected void doCommit() throws InterruptedException {
			int puts = putList.size();
			int takes = takeKeyList.size();
			synchronized (queueLock) {
				if (puts > 0) {
					for (FlumeEvent e : putList) {
						queueMap.put(tailIndex.getAndIncrement(), e);
					}
				} else if (takes > 0) {
					for (long t : takeKeyList) {
						queueMap.remove(t);
					}
				}
				currentTransaction.commitTransaction();
			}
			putList.clear();
			takeKeyList.clear();

			if (puts > 0) {
				queueStored.release(puts);
				channelCounter.addToEventPutSuccessCount(puts);
			}
			if (takes > 0) {
				channelCounter.addToEventTakeSuccessCount(takes);
			}
			channelCounter.setChannelSize(queueMap.size());
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see org.apache.flume.channel.BasicTransactionSemantics#doRollback()
		 */
		@Override
		protected void doRollback() throws InterruptedException {
			synchronized (queueLock) {
				if (takeKeyList.size() > 0)
					headIndex.set(takeKeyList.get(0));
			}
			queueStored.release(takeKeyList.size());
			takeKeyList.clear();
			currentTransaction.abortTransaction();
			channelCounter.setChannelSize(queueMap.size());
		}
	}

	/** The queue lock. */
	private Object queueLock = new Object();

	/** The queue map. */
	@GuardedBy(value = "queueLock")
	private StoredSortedMap<Long, FlumeEvent> queueMap;

	/** The channel counter. */
	private ChannelCounter channelCounter;

	/** The db dir. */
	private String dbDir;

	/** The jf. */
	private JEConf jf;

	/** The db name. */
	private String dbName = "flumeDB";
	/** The head index. 头部指针 */
	private AtomicLong headIndex = new AtomicLong(0);

	/** The tail index. */
	private AtomicLong tailIndex = new AtomicLong(0);

	/** The queue stored. */
	private Semaphore queueStored;

	/** The keep alive. */
	private volatile int keepAlive;

	/**
	 * Instantiates a new sorted map channel.
	 */
	public JeSortedMapChannel() {
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * org.apache.flume.channel.AbstractChannel#configure(org.apache.flume.Context
	 * )
	 */
	@Override
	public void configure(Context context) {
		if (channelCounter == null) {
			channelCounter = new ChannelCounter(getName());
		}
		String homePath = System.getProperty("user.home").replace('\\', '/');
		if (getName() == null || getName().equals("")) {
			LOG.warn("channel name is null");
		}
		dbDir = context.getString("bdbdir", homePath + "/.flume/je-channel/db") + "/" + getName();
		try {
			keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
		} catch (NumberFormatException e) {
			keepAlive = defaultKeepAlive;
		}
		if (jf == null) {
			jf = createDatabase(this.dbDir, this.dbName);
		}
		if (queueMap == null) {
			queueMap = getSortedMap(jf.db, FlumeEvent.class, jf.dbEnv.getClassCatalog());
		}
	}

	/**
	 * Gets the channel counter.
	 * 
	 * @return the channel counter
	 */
	public ChannelCounter getChannelCounter() {
		return channelCounter;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see org.apache.flume.channel.AbstractChannel#start()
	 */
	@Override
	public synchronized void start() {
		channelCounter.start();
		channelCounter.setChannelSize(queueMap.size());
		super.start();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see org.apache.flume.channel.AbstractChannel#stop()
	 */
	@Override
	public synchronized void stop() {
		channelCounter.setChannelSize(queueMap.size());
		channelCounter.stop();
		closeDB();
		super.stop();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see org.apache.flume.channel.BasicChannelSemantics#createTransaction()
	 */
	@Override
	protected BasicTransactionSemantics createTransaction() {
		return new MapTransaction();
	}

	/**
	 * Creates the database.
	 * 
	 * @param dbDir
	 *            the db dir
	 * @param dbName
	 *            the db name
	 * @return the JE conf
	 */
	private JEConf createDatabase(String dbDir, String dbName) {
		File envFile = null;
		EnvironmentConfig envConfig = null;
		DatabaseConfig dbConfig = null;
		JEConf jeconf = new JEConf();
		try {
			// 数据库位置
			envFile = new File(dbDir);
			if (!envFile.exists()) {
				envFile.mkdirs();
			}

			// 数据库环境配置
			envConfig = new EnvironmentConfig();
			envConfig.setAllowCreate(true);
			envConfig.setTransactional(true);
			envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
			// 数据库配置
			dbConfig = new DatabaseConfig();
			dbConfig.setAllowCreate(true);
			dbConfig.setTransactional(true);
			// dbConfig.setDeferredWrite(true);

			// 创建环境
			jeconf.dbEnv = new BdbJeEnvironment(envFile, envConfig);
			// 打开数据库
			jeconf.db = jeconf.dbEnv.openDatabase(null, dbName, dbConfig);

		} catch (DatabaseNotFoundException e) {
			throw e;
		} catch (DatabaseExistsException e) {
			throw e;
		} catch (DatabaseException e) {
			throw e;
		} catch (IllegalArgumentException e) {
			throw e;
		}
		return jeconf;
	}

	/**
	 * Gets the sorted map.
	 * 
	 * @param db
	 *            the db
	 * @param valueClass
	 *            the value class
	 * @param classCatalog
	 *            the class catalog
	 * @return the sorted map
	 */
	private StoredSortedMap<Long, FlumeEvent> getSortedMap(Database db, Class<FlumeEvent> valueClass, StoredClassCatalog classCatalog) {
		EntryBinding<FlumeEvent> valueBinding = new SerialBinding<FlumeEvent>(classCatalog, valueClass); // 序列化绑定
		EntryBinding<Long> keyBinding = new SerialBinding<Long>(classCatalog, Long.class); // 序列化绑定
		StoredSortedMap<Long, FlumeEvent> rt = new StoredSortedMap<Long, FlumeEvent>(db, keyBinding, valueBinding, true/* 允许写 */);
		if (rt.size() > 0) {
			headIndex.set(rt.firstKey());
			tailIndex.set(rt.lastKey() + 1l);
			queueStored = new Semaphore(rt.size());
		} else {
			queueStored = new Semaphore(0);
		}

		return rt;
	}

	/**
	 * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境.
	 */
	private void closeDB() {
		if (jf == null) {
			return;
		}
		try {
			if (jf.dbEnv != null) {
				jf.dbEnv.sync();
				jf.dbEnv.close();
			}
		} catch (DatabaseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (UnsupportedOperationException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
